package com.carol.bigdata.utils

import java.util
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.filter._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.io.compress.Compression

import scala.collection.JavaConverters._
import scala.collection.mutable

object HBaseUtil {

    val conf: Configuration = HBaseConfiguration.create()
    val jobConf = new JobConf(conf)
    jobConf.setOutputFormat(classOf[TableOutputFormat])

    // 更新配置
    def setConf(hbaseParams: Map[String, String]): Unit = {
        for ((key, value) <- hbaseParams) {
            jobConf.set(key, value)
        }
    }

    // 写入事件数据,单个rdd,单个列簇,rowkey按照规则生成,可指定时间字段
    def writeEvent(hbaseParams: Map[String, String],
                   result: RDD[Map[String, String]],
                   table: String,
                   rowkeyRule: String,
                   cf: String = "value",
                   timeField: String = "timestamp"): Unit = {
        setConf(hbaseParams)
        jobConf.set(TableOutputFormat.OUTPUT_TABLE, table)
        result.map(x => {
            val record = x - "rid" - "type"
            val rowKey = FuncUtil.getRowKey(record, rowkeyRule)
            // 够造put对象
            val put = new Put(Bytes.toBytes(rowKey))
            for ((k, v) <- record) {
                put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(k), Bytes.toBytes(v))
            }
            // put.setDurability(Durability.SKIP_WAL)
            (new ImmutableBytesWritable(rowKey.getBytes()), put)
        }).saveAsHadoopDataset(jobConf)
        println(s"===> write to HBase table: ${table}")
    }

    def writeMultiCFEvent(hbaseParams: Map[String, String],
                          result: RDD[Map[String, (String, String)]],
                          table: String,
                          rowkeyRule: String,
                          timeField: String = "timestamp"): Unit = {
        setConf(hbaseParams)
        jobConf.set(TableOutputFormat.OUTPUT_TABLE, table)
        result.map(x => {
            val record = x - "rid" - "type"
            val rowKey = FuncUtil.getRowKeyFromCFValuePair(record, rowkeyRule)
            // 够造put对象
            val put = new Put(Bytes.toBytes(rowKey))
            for ((col, (cf, v)) <- record) {
                put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(col), Bytes.toBytes(v))
            }
            // put.setDurability(Durability.SKIP_WAL)
            (new ImmutableBytesWritable(rowKey.getBytes()), put)
        }).saveAsHadoopDataset(jobConf)
        println(s"===> write to HBase table: ${table}")
    }

    // 写入统计数据,多个rdd对应key,value列簇,rowkey为(key,value)数据中的key组合
    def writeManyRDD2KvCF(hbaseParams: Map[String, String],
                          table: String,
                          rddList: List[RDD[(List[String], List[String])]],
                          keyCF: String,
                          valueCf: String,
                          keyColumn: List[String],
                          columnList: List[List[String]]): Unit = {
        setConf(hbaseParams)
        jobConf.set(TableOutputFormat.OUTPUT_TABLE, table)
        for ((columns, rdd) <- columnList.zip(rddList)) {
            rdd.map(x => {
                // 构造rowkey
                val rowkey = x._1.mkString("_")
                // 够造put对象
                val put = new Put(Bytes.toBytes(rowkey))
                for ((ck, value) <- keyColumn.zip(x._1)) {
                    put.addColumn(Bytes.toBytes(keyCF), Bytes.toBytes(ck), Bytes.toBytes(value.toString))
                }
                for ((cv, value) <- columns.zip(x._2)) {
                    put.addColumn(Bytes.toBytes(valueCf), Bytes.toBytes(cv), Bytes.toBytes(value.toString))
                }
                (new ImmutableBytesWritable(rowkey.getBytes()), put)
            }).saveAsHadoopDataset(jobConf)
        }
    }

    // 写入统计数据,多个rdd对应key,value列簇,rowkey为(key,value)数据中的key组合
    def writeIntManyRDD2KvCF(hbaseParams: Map[String, String],
                             table: String,
                             rddList: List[RDD[(List[String], List[Int])]],
                             keyCF: String,
                             valueCf: String,
                             keyColumn: List[String],
                             columnList: List[List[String]]): Unit = {
        setConf(hbaseParams)
        jobConf.set(TableOutputFormat.OUTPUT_TABLE, table)
        for ((columns, rdd) <- columnList.zip(rddList)) {
            rdd.map(x => {
                // 构造rowkey
                val rowkey = x._1.mkString("_")
                // 够造put对象
                val put = new Put(Bytes.toBytes(rowkey))
                for ((ck, value) <- keyColumn.zip(x._1)) {
                    put.addColumn(Bytes.toBytes(keyCF), Bytes.toBytes(ck), Bytes.toBytes(value.toString))
                }
                for ((cv, value) <- columns.zip(x._2)) {
                    put.addColumn(Bytes.toBytes(valueCf), Bytes.toBytes(cv), Bytes.toBytes(value.toString))
                }
                (new ImmutableBytesWritable(rowkey.getBytes()), put)
            }).saveAsHadoopDataset(jobConf)
        }
    }


    // 写入统计数据,单个rdd对应key,value列簇,rowkey为(key,value)数据中的key组合
    def writeIntRDD2KvCF(hbaseParams: Map[String, String],
                         table: String,
                         rdd: RDD[(List[String], List[Int])],
                         keyCF: String,
                         valueCf: String,
                         keyColumn: List[String],
                         columnList: List[String]): Unit = {
        setConf(hbaseParams)
        jobConf.set(TableOutputFormat.OUTPUT_TABLE, table)
        rdd.map(x => {
            // 构造rowkey
            val rowkey = x._1.mkString("_")
            // 够造put对象
            val put = new Put(Bytes.toBytes(rowkey))
            // 写入keyCF列
            for ((ck, value) <- keyColumn.zip(x._1)) {
                put.addColumn(Bytes.toBytes(keyCF), Bytes.toBytes(ck), Bytes.toBytes(value.toString))
            }
            // 写入ValueCF列
            for ((cv, value) <- columnList.zip(x._2)) {
                put.addColumn(Bytes.toBytes(valueCf), Bytes.toBytes(cv), Bytes.toBytes(value.toString))
            }
            (new ImmutableBytesWritable(rowkey.getBytes()), put)
        }).saveAsHadoopDataset(jobConf)
    }

    // 写入统计数据,单个rdd对应key,value列簇,rowkey为(key,value)数据中的key组合
    def writeRDD2KvCF(hbaseParams: Map[String, String],
                      table: String,
                      rdd: RDD[(List[String], List[String])],
                      keyCF: String,
                      valueCf: String,
                      keyColumn: List[String],
                      columnList: List[String]): Unit = {
        setConf(hbaseParams)
        jobConf.set(TableOutputFormat.OUTPUT_TABLE, table)
        rdd.map(x => {
            // 构造rowkey
            val rowkey = x._1.mkString("_")
            // 够造put对象
            val put = new Put(Bytes.toBytes(rowkey))
            // 写入keyCF列
            for ((ck, value) <- keyColumn.zip(x._1)) {
                put.addColumn(Bytes.toBytes(keyCF), Bytes.toBytes(ck), Bytes.toBytes(value.toString))
            }
            // 写入ValueCF列
            for ((cv, value) <- columnList.zip(x._2)) {
                put.addColumn(Bytes.toBytes(valueCf), Bytes.toBytes(cv), Bytes.toBytes(value.toString))
            }
            (new ImmutableBytesWritable(rowkey.getBytes()), put)
        }).saveAsHadoopDataset(jobConf)
    }


    // 写入统计数据,写入单个rdd对应多个列簇,rowkey为(key,value)数据中的key组合
    def writeRDD2ManyCF(hbaseParams: Map[String, String],
                        table: String,
                        rdd: RDD[(List[String], List[List[String]])],
                        keyCF: String,
                        keyColumn: List[String],
                        valueCfList: List[String],
                        columnList: List[List[String]]): Unit = {
        setConf(hbaseParams)
        jobConf.set(TableOutputFormat.OUTPUT_TABLE, table)
        rdd.map(x => {
            // 构造rowkey
            val rowkey = x._1.mkString("_")
            // 够造put对象
            val put = new Put(Bytes.toBytes(rowkey))
            for ((ck, value) <- keyColumn.zip(x._1)) {
                put.addColumn(Bytes.toBytes(keyCF), Bytes.toBytes(ck), Bytes.toBytes(value.toString))
            }
            for (((valueCf, columns), values) <- valueCfList.zip(columnList).zip(x._2)) {
                for ((cv, value) <- columns.zip(values)) {
                    put.addColumn(Bytes.toBytes(valueCf), Bytes.toBytes(cv), Bytes.toBytes(value.toString))
                }
            }
            (new ImmutableBytesWritable(rowkey.getBytes()), put)
        }).saveAsHadoopDataset(jobConf)
    }


    // 写入统计数据,写入单个rdd对应多个列簇,rowkey为(key,value)数据中的key组合
    def writeIntRDD2ManyCF(hbaseParams: Map[String, String],
                           table: String,
                           rdd: RDD[(List[String], List[List[Int]])],
                           keyCF: String,
                           keyColumn: List[String],
                           valueCfList: List[String],
                           columnList: List[List[String]]): Unit = {
        setConf(hbaseParams)
        jobConf.set(TableOutputFormat.OUTPUT_TABLE, table)
        rdd.map(x => {
            // 构造rowkey
            val rowkey = x._1.mkString("_")
            // 够造put对象
            val put = new Put(Bytes.toBytes(rowkey))
            for ((ck, value) <- keyColumn.zip(x._1)) {
                put.addColumn(Bytes.toBytes(keyCF), Bytes.toBytes(ck), Bytes.toBytes(value.toString))
            }
            for (((valueCf, columns), values) <- valueCfList.zip(columnList).zip(x._2)) {
                for ((cv, value) <- columns.zip(values)) {
                    put.addColumn(Bytes.toBytes(valueCf), Bytes.toBytes(cv), Bytes.toBytes(value.toString))
                }
            }
            (new ImmutableBytesWritable(rowkey.getBytes()), put)
        }).saveAsHadoopDataset(jobConf)
    }


    // 写入统计数据,写入单个rdd,按照kv列，写入对应的列的cf
    def writeRDD2KVColumn(hbaseParams: Map[String, String],
                          table: String,
                          rdd: RDD[(List[String], List[String])],
                          keyCF: String,
                          keyColumn: List[String],
                          valueColumn: List[String],
                          columnCF: List[String]): Unit = {
        setConf(hbaseParams)
        jobConf.set(TableOutputFormat.OUTPUT_TABLE, table)
        rdd.map(x => {
            // 构造rowkey
            val rowkey = x._1.mkString("_")
            // 够造put对象
            val put = new Put(Bytes.toBytes(rowkey))
            for ((ck, value) <- keyColumn.zip(x._1)) {
                put.addColumn(Bytes.toBytes(keyCF), Bytes.toBytes(ck), Bytes.toBytes(value.toString))
            }
            for (((column, cf), value) <- valueColumn.zip(columnCF).zip(x._2)) {
                put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value.toString))
            }
            (new ImmutableBytesWritable(rowkey.getBytes()), put)
        }).saveAsHadoopDataset(jobConf)
    }


    // 写入统计数据,写入单个rdd对应单个列簇,rowkey为(key,value)数据中的key组合
    def writeRDD2CF(hbaseParams: Map[String, String],
                    result: RDD[(List[String], List[String])],
                    table: String,
                    keyColumn: List[String],
                    valueColumn: List[String],
                    cf: String = "value"): Unit = {
        setConf(hbaseParams)
        jobConf.set(TableOutputFormat.OUTPUT_TABLE, table)
        result.map(x => {
            // 构造rowkey
            val rowkey = x._1.mkString("_")
            // 够造put对象
            val put = new Put(Bytes.toBytes(rowkey))
            for ((ck, value) <- keyColumn.zip(x._1)) {
                put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(ck), Bytes.toBytes(value.toString))
            }
            for ((cv, value) <- valueColumn.zip(x._2)) {
                put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cv), Bytes.toBytes(value.toString))
            }
            (new ImmutableBytesWritable(rowkey.getBytes()), put)
        }).saveAsHadoopDataset(jobConf)
    }


    // 写入统计数据,写入单个rdd对应单个列簇,rowkey为(key,value)数据中的key组合
    def writeIntRDD2CF(hbaseParams: Map[String, String],
                       result: RDD[(List[String], List[Int])],
                       table: String,
                       keyColumn: List[String],
                       valueColumn: List[String],
                       cf: String = "value"): Unit = {
        setConf(hbaseParams)
        jobConf.set(TableOutputFormat.OUTPUT_TABLE, table)
        result.map(x => {
            // 构造rowkey
            val rowkey = x._1.mkString("_")
            // 够造put对象
            val put = new Put(Bytes.toBytes(rowkey))
            for ((ck, value) <- keyColumn.zip(x._1)) {
                put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(ck), Bytes.toBytes(value.toString))
            }
            for ((cv, value) <- valueColumn.zip(x._2)) {
                put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cv), Bytes.toBytes(value.toString))
            }
            (new ImmutableBytesWritable(rowkey.getBytes()), put)
        }).saveAsHadoopDataset(jobConf)
    }


    /*
       * 扫描hbase的某天的数据转化为RDD
       * @params:
       *    spark: spark instance
       *    table: 查询的表
       *    cf: 查询的列簇,默认所有
       *    columns: 查询的列,默认所有
       *    isId: 是否获取某个id的行数据
       *    rowkeys: 如果isId为true, 则rowkeys第一个数据为id,[id],
       *             否则为时间扫描, 如果长度为1[timeStr]，则扫描该天, 否则为一个开始时间和结束时间，不包含结束时间数据[startTime, endTime)
       * @return:
       *    RDD[Map(account_id -> 120007, login_platform_id -> 2, account_type -> 0)]
       */
    def readAsList(hbaseParams: Map[String, String],
                   spark: SparkSession,
                   table: String,
                   cf: String,
                   columns: List[String],
                   rowkeys: List[String],
                   timeColumn: String = "statday",
                   timeMode: String = "Day", // timeMode ~ [Day, Hour, Min]
                   filterMode: String = "TIME",
                   filterCF: String = "",
                   filterCol: String = "",
                   filterValue: String = ""): RDD[List[String]] = {
        setConf(hbaseParams)
        // 通过Spark的newAPIHadoopRDD读取数据
        val hbaseRDD = getHbaseRDD(spark, table, List(cf), List(columns), rowkeys, filterMode, filterCF, filterCol, filterValue)
        val data: RDD[List[String]] = hbaseRDD.map(_._2).map(x => {
            var value = List[String]()
            for (c <- columns) {
                if (c == timeColumn) {
                    val day = {
                        if (x.containsNonEmptyColumn(cf.getBytes(), c.getBytes())) Bytes.toString(x.getValue(cf.getBytes(), c.getBytes()))
                        else "0000-00-00 00:00:00.000"
                    }
                    value :+= FuncUtil.getStatDay(day, timeMode)
                } else
                    value :+= Bytes.toString(x.getValue(cf.getBytes(), c.getBytes()))
            }
            value

        })
        data
    }


    // 读取hbase并以map形式返回
    def readAsMap(hbaseParams: Map[String, String],
                  spark: SparkSession,
                  table: String, // 表名
                  cfList: List[String], // 多个列簇
                  columnList: List[List[String]], // 每个列簇的列表与columnList的索引
                  rowkeys: List[String], // rowkeys: [id] / [startTime,endTime] / [](空则默认所有)
                  timeColumn: String = "create_time", // 时间列,将时间转化为年月日格式 2020-10-22
                  timeMode: String = "Day", // timeMode ~ [Day, Hour, Min]
                  filterMode: String = "TIME"): RDD[mutable.Map[String, String]] = {
        // println("hbaseParams:", hbaseParams)
        setConf(hbaseParams)
        // 通过Spark的newAPIHadoopRDD读取数据
        val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = getHbaseRDD(spark, table, cfList, columnList, rowkeys, filterMode)
        hbaseRDD.take(5).foreach(println)
        // Map(account_id -> 120007, login_platform_id -> 2, account_type -> 0)
        val data: RDD[mutable.Map[String, String]] = hbaseRDD.map(_._2).map(x => {
            println(x)
            val map = mutable.Map[String, String]()
            for ((cf, columns) <- cfList.zip(columnList)) {
                for (c <- columns) {
                    if (c == timeColumn) {
                        val day = {
                            if (x.containsNonEmptyColumn(cf.getBytes(), c.getBytes())) Bytes.toString(x.getValue(cf.getBytes(), c.getBytes()))
                            else "0000-00-00 00:00:00.000"
                        }
                        map.put("statday", FuncUtil.getStatDay(day, timeMode))
                    } else
                        map.put(c, Bytes.toString(x.getValue(cf.getBytes(), c.getBytes())))
                }
            }
            map
        })
        data
    }

    // 读取某些CF所有列的数据作为一个MAP RDD
    def readCFAllAsMap(hbaseParams: Map[String, String],
                       spark: SparkSession,
                       table: String, // 表名
                       cfList: List[String],
                       rowkeys: List[String], // rowkeys: [id] / [startTime,endTime] / [](空则默认所有)
                       filterMode: String = "TIME"
                      ): RDD[mutable.Map[String, String]] = {
        setConf(hbaseParams)
        val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = getHbaseRDD(spark, table, cfList, List(), rowkeys, filterMode)
        val data: RDD[mutable.Map[String, String]] = hbaseRDD.map(_._2).map(x => {
            val map = mutable.Map[String, String]()
            for (cf <- cfList) {
                val fMap: util.NavigableMap[Array[Byte], Array[Byte]] = x.getFamilyMap(Bytes.toBytes(cf))
                for (e <- fMap.entrySet().asScala) {
                    map.put(Bytes.toString(e.getKey), Bytes.toString(e.getValue))
                }
            }
            map
        })
        data
    }


    // 读取用户画像中间表Int和Map数据
    def readIntMapRDD(hbaseParams: Map[String, String],
                      spark: SparkSession,
                      table: String, // 表名
                      cfList: List[String],
                      rowkeys: List[String],
                      keyColumn: List[String],
                      intColumn: List[String],
                      mapColumn: List[String],
                      filterMode: String = "TIME"): RDD[(List[String], (List[Int], List[Map[String, Int]]))] = {
        val data: RDD[(List[String], (List[Int], List[Map[String, Int]]))] = HBaseUtil
          .readCFAllAsMap(hbaseParams, spark, table, cfList, rowkeys, filterMode)
          .map(x => {
              val keyList: List[String] = FuncUtil.getKey(x, keyColumn)
              val intValueList: List[Int] = FuncUtil.getIntValue(x, intColumn)
              val mapValueList: List[Map[String, Int]] = FuncUtil.getMapValue(x, mapColumn)
              (keyList, (intValueList, mapValueList))
          })
        data
    }

    // 读取用户画像表Int和Map和str数据
    def readIntStrMapRDD(hbaseParams: Map[String, String],
                         spark: SparkSession,
                         table: String, // 表名
                         cfList: List[String],
                         rowkeys: List[String],
                         keyColumn: List[String],
                         intColumn: List[String],
                         strColumn: List[String],
                         mapColumn: List[String],
                         filterMode: String = "TIME",
                         valueFunc: String => String = FuncUtil.defaultTagDetailFunc
                        ): RDD[(List[String], (List[Map[String, Int]], List[Map[String, Int]]))] = {
        val data = HBaseUtil.readCFAllAsMap(hbaseParams, spark, table, cfList, rowkeys, filterMode)
          .map(x => {
              val keyList: List[String] = FuncUtil.getKey(x, keyColumn)
              val intValueList: List[Map[String, Int]] = FuncUtil.getMapIntValue(x, intColumn)
              val strValueList: List[Map[String, Int]] = FuncUtil.getString2MapValue(x, strColumn)
              val mapValueList: List[Map[String, Int]] = FuncUtil.getMapValue(x, mapColumn, valueFunc)
              (keyList, (intValueList, strValueList ::: mapValueList))
          })
        data
    }


    /*
     * 获取hbase RDD数据
     */
    def getHbaseRDD(spark: SparkSession,
                    table: String,
                    cfList: List[String],
                    columnList: List[List[String]],
                    rowkeys: List[String],
                    filterMode: String = "TIME",
                    filterCF: String = "",
                    filterCol: String = "",
                    filterValue: String = ""): RDD[(ImmutableBytesWritable, Result)] = {
        val scan: Scan = getScan(rowkeys, filterMode, filterCF, filterCol, filterValue)
        // 设置要查询的cf和columns
        if (cfList.nonEmpty && columnList.isEmpty) {
            for (cf <- cfList) {
                scan.addFamily(Bytes.toBytes(cf))
            }
        } else {
            for ((cf, columns) <- cfList.zip(columnList)) {
                scan.addFamily(Bytes.toBytes(cf))
                for (c <- columns)
                    scan.addColumn(Bytes.toBytes(cf), Bytes.toBytes(c))
            }
        }
        jobConf.set(TableInputFormat.INPUT_TABLE, table)
        jobConf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))
        // 通过Spark的newAPIHadoopRDD读取数据
        val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = spark.sparkContext.newAPIHadoopRDD(
            jobConf,
            classOf[TableInputFormat],
            classOf[ImmutableBytesWritable],
            classOf[Result]
        )
        hbaseRDD
    }


    /*
     *  设置scan的rowkeys
     *  @param:
     *  filterMode="ID":    是否获取某个id的行数据
     *  rowkeys:            如果filterMode为"ID", 则rowkeys第一个数据为id,[id], 否则为[timeStr,...]
     *  filterCF:           对列进行过滤时所需要的指定的列族
     *  filterCol:          对列进行过滤时所需要的指定的列(需要注意的是当要应用列过滤器时, 要过滤的列必须要包括在读取的列中, 否则列过滤器不会生效)
     *  filterValue:        对列进行过滤时所需要的指定的值
     */
    def getScan(rowkeys: List[String],
                filterMode: String = "TIME",
                filterCF: String = "",
                filterCol: String = "",
                filterValue: String = ""): Scan = {
        val scan: Scan = new Scan()
        // 查找前缀,或者前缀的起始和结束
        if (rowkeys.length == 1)
            scan.setRowPrefixFilter(rowkeys.head.getBytes())
        else if (rowkeys.length >= 2) {
            scan.setStartRow(rowkeys.head.getBytes())
            scan.setStopRow(rowkeys.last.getBytes())
        }
        // 查找列
        if (filterCF != "" && filterCol != "" && filterValue != "") {
            val filter: SingleColumnValueFilter = HBaseFilter.getColumnFilter(
                filterCF, filterCol, filterValue)
            scan.setFilter(filter)
        }

        scan
    }


    def delete(hbaseParams: Map[String, String],
               result: RDD[Map[String, String]],
               table: String,
               rowkeyRule: String): Unit = {
        setConf(hbaseParams)
        jobConf.set(TableOutputFormat.OUTPUT_TABLE, table)
        try {
            val conn: Connection = ConnectionFactory.createConnection(jobConf)
            val tb: Table = conn.getTable(TableName.valueOf(table))
            val delList = result.map(x => {
                val record = x - "rid" - "type"
                val rowKey = FuncUtil.getRowKey(record, rowkeyRule)
                // 构造delete对象
                new Delete(Bytes.toBytes(rowKey))
            }).collect().toList.asJava
            //tb.delete(delList)
            tb.batch(delList, null)
            tb.close()
            conn.close()
            println(s"Delete from hbase table: ${table} successfully!")
        }
        catch {
            case e: Throwable => println(s"Delete from hbase table: ${table} failed!"); e.printStackTrace()
        }
    }


    def createTable(hbaseParams: Map[String, String], tableName: String,  familyList: List[String]): Unit = {
        setConf(hbaseParams)
        //设置要查询的表
        jobConf.set(TableInputFormat.INPUT_TABLE, tableName)
        // 获取管理员对象
        val admin = new HBaseAdmin(jobConf)
        if (!admin.isTableAvailable(tableName)) {
            println(s"table:${tableName} is not exist!")
            val tableDescriptor = new HTableDescriptor(tableName)
            for (family <- familyList) {
                val hcd:HColumnDescriptor = new HColumnDescriptor(family.getBytes())
                hcd.setCompressionType(Compression.Algorithm.SNAPPY)  // 设置压缩类型
                hcd.setTimeToLive(30 * 24 * 60 * 60)  // 设置过期时间30天
                tableDescriptor.addFamily(hcd)
            }
            admin.createTable(tableDescriptor)
            println(s"table:${tableName} created!")
        } else {
            println("Table Exists! not Create Table")
        }
    }
}
